Skip to content

Conversation

JiaqiWang18
Copy link
Contributor

@JiaqiWang18 JiaqiWang18 commented Jul 18, 2025

What changes were proposed in this pull request?

Scope DataflowGraphRegistry to spark connect session. This is done by adding it as a member to the spark connect SessionHolder. This is added here because pipeline executions are also scoped to this class.

Added getter/setter methods to access dataflow graphs for the session.

Added logic to drop all dataflow graphs when session is closed.

Why are the changes needed?

Currently DataflowGraphRegistry is a singleton, but it should instead be scoped to a single SparkSession for proper isolation between pipelines that are run on the same cluster.

This allows proper cleanup of pipeline resources when session is closed.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added new testcases to test data flow graph session isolation and proper clean up.

Was this patch authored or co-authored using generative AI tooling?

No

@JiaqiWang18 JiaqiWang18 changed the title [WIP] Spark 52432 session graph registry [SPARK-52432][SDP][SQL] Scope DataflowGraphRegistry to Session Jul 18, 2025
@JiaqiWang18
Copy link
Contributor Author

@AnishMahto @sryza

Copy link
Contributor

@AnishMahto AnishMahto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just nits

Comment on lines 496 to 537
private[connect] def createDataflowGraph(
defaultCatalog: String,
defaultDatabase: String,
defaultSqlConf: Map[String, String]): String = {
dataflowGraphRegistry.createDataflowGraph(defaultCatalog, defaultDatabase, defaultSqlConf)
}

/**
* Retrieves the dataflow graph for the given graph ID.
*/
private[connect] def getDataflowGraph(graphId: String): Option[GraphRegistrationContext] = {
dataflowGraphRegistry.getDataflowGraph(graphId)
}

/**
* Retrieves the dataflow graph for the given graph ID, throwing if not found.
*/
private[connect] def getDataflowGraphOrThrow(graphId: String): GraphRegistrationContext = {
dataflowGraphRegistry.getDataflowGraphOrThrow(graphId)
}

/**
* Removes the dataflow graph with the given ID.
*/
private[connect] def dropDataflowGraph(graphId: String): Unit = {
dataflowGraphRegistry.dropDataflowGraph(graphId)
}

/**
* Returns all dataflow graphs in this session.
*/
private[connect] def getAllDataflowGraphs: Seq[GraphRegistrationContext] = {
dataflowGraphRegistry.getAllDataflowGraphs
}

/**
* Removes all dataflow graphs from this session. Called during session cleanup.
*/
private[connect] def dropAllDataflowGraphs(): Unit = {
dataflowGraphRegistry.dropAllDataflowGraphs()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any particular reason why we added these delegator methods, rather than just having callers call SessionHolder.dataflowGraphRegistry.blah()?

If its for access modifier reasons, why not just do private[connect] lazy val dataflowGraphRegistry?

def buildGraph(pythonText: String): DataflowGraph = {
val indentedPythonText = pythonText.linesIterator.map(" " + _).mkString("\n")
// create a unique identifier to allow identifying the session and dataflow graph
val identifier = UUID.randomUUID().toString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to something more descriptive like customSessionIdentifier

throw new RuntimeException(
s"Python process failed with exit code $exitCode. Output: ${output.mkString("\n")}")
}
val activateSessions = SparkConnectService.sessionManager.listActiveSessions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val activeSessions

val dataflowGraphContexts = DataflowGraphRegistry.getAllDataflowGraphs
// get the session holder by finding the session with the custom UUID set in the conf
val sessionHolder = activateSessions
.map(info => SparkConnectService.sessionManager.getIsolatedSessionIfPresent(info.key).get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getIsolatedSession() instead of getIsolatedSessionIfPresent(...).get

Comment on lines 45 to 46
.getIsolatedSessionIfPresent(SessionKey(defaultUserId, defaultSessionId))
.getOrElse(throw new RuntimeException("Session not found"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just call getIsolatedSession

Comment on lines 444 to 449
val graph1 = sessionHolder.getDataflowGraph(graphId1).getOrElse {
fail(s"Graph with ID $graphId1 not found in session")
}
val graph2 = sessionHolder.getDataflowGraph(graphId2).getOrElse {
fail(s"Graph with ID $graphId2 not found in session")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just call getDataflowGraphOrThrow

Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

@sryza sryza closed this in 0177265 Jul 21, 2025
@sryza
Copy link
Contributor

sryza commented Jul 21, 2025

Merged to master

haoyangeng-db pushed a commit to haoyangeng-db/apache-spark that referenced this pull request Jul 22, 2025
### What changes were proposed in this pull request?

Scope `DataflowGraphRegistry` to spark connect session. This is done by adding it as a member to the spark connect [SessionHolder](https://github.com/apache/spark/blob/master/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala#L54). This is added here because pipeline executions are also [scoped](https://github.com/apache/spark/blob/master/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala#L125) to this class.

Added getter/setter methods to access dataflow graphs for the session.

Added logic to drop all dataflow graphs when session is closed.

### Why are the changes needed?

Currently `DataflowGraphRegistry` is a singleton, but it should instead be scoped to a single SparkSession for proper isolation between pipelines that are run on the same cluster.

This allows proper cleanup of pipeline resources when session is closed.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added new testcases to test data flow graph session isolation and proper clean up.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#51544 from JiaqiWang18/SPARK-52432-session-graphRegistry.

Authored-by: Jacky Wang <[email protected]>
Signed-off-by: Sandy Ryza <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants